在昨天的文章中,我們探討了 Hash Join 的原理和實現,學習到 Hash Join 透過兩階段模型(Build Phase 和 Probe Phase)實現了 O(N + M) 的時間複雜度,然而 Hash Join 在以下場景效能會大幅下降:
1. 數據已排序
-- 兩個表都已經按 user_id 排序
SELECT *
FROM sorted_users u
JOIN sorted_orders o ON u.user_id = o.user_id
ORDER BY u.user_id;
如果數據已經排序,使用 Hash Join 意味著我們忽略了這個寶貴的排序信息。我們需要構建 Hash Table,消耗額外記憶體和 CPU 時間。
2. 記憶體嚴重受限
-- 兩個都是大表,記憶體只有 2GB
SELECT *
FROM huge_table1 t1 -- 10 億行,20 GB
JOIN huge_table2 t2 -- 8 億行,15 GB
ON t1.join_key = t2.join_key;
Hash Join 需要將較小的表(15 GB)完全載入記憶體構建 Hash Table。但如果可用記憶體只有 2GB,系統需要不斷的 spilling,性能急劇下降。
這就是 Sort-Merge Join 發揮作用的地方。今天我們將學習:
Sort-Merge Join 是基於 Merge Sort 的 Join 算法。使用它的前提是:兩個輸入都已按連接鍵排序。
基本思想:
已排序的左表: 已排序的右表:
user_id | name user_id | city
--------+------- --------+----------
1 | Alice 1 | Taipei
1 | Alice2 2 | Taichung
2 | Bob 2 | Kaohsiung
3 | Carol 4 | Tainan
5 | Dave
歸併過程:
1. 同時從兩個表的開頭開始掃描
2. 比較當前行的連接鍵
3. 如果相等,產生結果行
4. 如果不等,推進較小值的那一側
5. 重複直到至少一側耗盡
讓我們通過一個例子來理解:
初始狀態:
Left: [1, 1, 2, 3, 5] ← L 指針指向第一個 1
Right: [1, 2, 2, 4] ← R 指針指向第一個 1
步驟 1: 比較 L=1 vs R=1
✓ 相等! 產生結果: (1, Alice, Taipei)
發現 Left 還有另一個 1,也匹配: (1, Alice2, Taipei)
推進 R 指針(Right 沒有更多 1 了)
步驟 2: L=1(第二個) vs R=2
L < R,推進 L 指針
步驟 3: L=2 vs R=2
✓ 相等! 產生結果: (2, Bob, Taichung)
Right 還有另一個 2: (2, Bob, Kaohsiung)
推進 L 和 R 指針
步驟 4: L=3 vs R=4
L < R,推進 L 指針
步驟 5: L=5 vs R=4
L > R,推進 R 指針
步驟 6: R 已耗盡,結束
最終結果:
user_id | name | city
--------+--------+-----------
1 | Alice | Taipei
1 | Alice2 | Taipei
2 | Bob | Taichung
2 | Bob | Kaohsiung
相比 Hash Join 的優勢:
時間複雜度分析:
假設輸入已排序:
- 掃描左表: O(N)
- 掃描右表: O(M)
- 總時間: O(N + M) ← 與 Hash Join 相同!
假設輸入未排序:
- 排序左表: O(N log N)
- 排序右表: O(M log M)
- 歸併: O(N + M)
- 總時間: O(N log N + M log M) ← 排序開銷較大
關鍵洞察:
// datafusion/physical-plan/src/joins/sort_merge_join/exec.rs
pub struct SortMergeJoinExec {
/// 左側輸入(已排序)
pub left: Arc<dyn ExecutionPlan>,
/// 右側輸入(已排序)
pub right: Arc<dyn ExecutionPlan>,
/// 連接條件
pub on: JoinOn,
/// 額外的過濾條件
pub filter: Option<JoinFilter>,
/// Join 類型
pub join_type: JoinType,
/// 左側的排序表達式
left_sort_exprs: LexOrdering,
/// 右側的排序表達式
right_sort_exprs: LexOrdering,
/// 排序選項(升序/降序、NULL 處理等)
pub sort_options: Vec<SortOptions>,
// ... 其他欄位
}
關鍵設計決策:
SortExec
left_sort_exprs
和 right_sort_exprs
記錄了預期的排序方式Sort-Merge Join 使用一個 Streamed 側和一個 Buffered 側:
Streamed 側:
- 逐行順序掃描
- 當前只保留少量行在記憶體中
- 不支援 spilling(因為數據量小)
Buffered 側:
- 緩衝所有具有相同連接鍵值的行
- 可能需要同時保留多個 batch
- 支援 spilling 到磁碟(如果記憶體不足)
為何需要 Buffering?
考慮有重複連接鍵的情況:
Streamed 側: Buffered 側:
user_id | name user_id | order_id
--------+------- --------+----------
1 | Alice 1 | 101
2 | Bob 1 | 102
1 | 103 ← 多個 user_id=1 的訂單
2 | 201
2 | 202
處理 Streamed user_id=1 (Alice) 時:
需要與 Buffered 側的所有 user_id=1 的行進行匹配
→ 必須緩衝 101, 102, 103 三行
→ 產生 3 個結果行
處理完 user_id=1 後:
可以釋放緩衝的 101, 102, 103
→ 記憶體被回收
哪一側是 Streamed/Buffered?
根據 Join 類型選擇:
// datafusion/physical-plan/src/joins/sort_merge_join/exec.rs
/// 決定哪一側作為 probe side (streamed)
fn probe_side(join_type: &JoinType) -> JoinSide {
match join_type {
JoinType::Inner | JoinType::Right | JoinType::RightSemi
| JoinType::RightAnti => {
JoinSide::Left // 左側作為 streamed
}
JoinType::Left | JoinType::LeftSemi | JoinType::LeftAnti => {
JoinSide::Right // 右側作為 streamed
}
JoinType::Full => {
JoinSide::Left // 默認左側作為 streamed
}
}
}
選擇原則: 對於 LEFT OUTER JOIN,左側必須完整輸出,所以選擇左側作為 streamed 側(逐行處理,確保每行都被處理)。
讓我們深入理解歸併的狀態機:
// 簡化的 Sort-Merge Join 執行邏輯
async fn sort_merge_join(
mut streamed: SendableRecordBatchStream,
mut buffered: SendableRecordBatchStream,
join_type: JoinType,
) -> Result<Vec<RecordBatch>> {
let mut output = Vec::new();
let mut buffered_data = BufferedData::new();
// 讀取第一個 streamed batch
let mut streamed_batch = streamed.next().await?;
let mut streamed_idx = 0;
// 讀取第一個 buffered batch
let mut buffered_batch = buffered.next().await?;
let mut buffered_idx = 0;
loop {
// 如果任一側耗盡,根據 join 類型決定是否繼續
if streamed_batch.is_none() || buffered_batch.is_none() {
break;
}
let streamed_key = get_join_key(&streamed_batch, streamed_idx);
let buffered_key = get_join_key(&buffered_batch, buffered_idx);
match streamed_key.cmp(&buffered_key) {
Ordering::Equal => {
// 找到匹配!
// 緩衝所有 buffered 側具有相同鍵值的行
buffered_data.clear();
buffered_data.push(buffered_batch, buffered_idx);
// 繼續掃描 buffered 側,收集所有相同鍵值的行
loop {
buffered_idx += 1;
if buffered_idx >= buffered_batch.num_rows() {
// 當前 batch 耗盡,讀取下一個
buffered_batch = buffered.next().await?;
if buffered_batch.is_none() {
break;
}
buffered_idx = 0;
}
let next_key = get_join_key(&buffered_batch, buffered_idx);
if next_key != streamed_key {
break; // 不再相等,停止緩衝
}
buffered_data.push(buffered_batch, buffered_idx);
}
// 產生輸出: streamed 行 × buffered 中所有匹配的行
for buffered_row in buffered_data.rows() {
let output_row = join_rows(
&streamed_batch,
streamed_idx,
buffered_row
);
output.push(output_row);
}
// 推進 streamed 指針
streamed_idx += 1;
if streamed_idx >= streamed_batch.num_rows() {
streamed_batch = streamed.next().await?;
streamed_idx = 0;
}
}
Ordering::Less => {
// streamed_key < buffered_key
// Streamed 側的當前值沒有匹配,推進 streamed
if join_type == JoinType::Left {
// LEFT JOIN: 需要輸出未匹配的 streamed 行
output.push(join_with_nulls(&streamed_batch, streamed_idx));
}
streamed_idx += 1;
if streamed_idx >= streamed_batch.num_rows() {
streamed_batch = streamed.next().await?;
streamed_idx = 0;
}
}
Ordering::Greater => {
// streamed_key > buffered_key
// Buffered 側的當前值沒有匹配,推進 buffered
if join_type == JoinType::Right || join_type == JoinType::Full {
// RIGHT/FULL JOIN: 需要輸出未匹配的 buffered 行
output.push(join_with_nulls_buffered(&buffered_batch, buffered_idx));
}
buffered_idx += 1;
if buffered_idx >= buffered_batch.num_rows() {
buffered_batch = buffered.next().await?;
buffered_idx = 0;
}
}
}
}
Ok(output)
}
讓我們通過一個詳細例子理解緩衝機制:
Streamed 側: Buffered 側:
user_id | name user_id | product | amount
--------+------- --------+---------+--------
1 | Alice 1 | Book | 100
1 | Alice2 1 | Pen | 50
2 | Bob 1 | Paper | 30
3 | Carol 2 | Book | 200
2 | Pencil | 80
4 | Eraser | 20
執行過程:
【步驟 1】
Streamed: user_id=1 (Alice)
Buffered: user_id=1 (Book)
→ 相等!
開始緩衝 Buffered 側所有 user_id=1 的行:
buffered_data = [
(1, Book, 100),
(1, Pen, 50),
(1, Paper, 30)
]
產生輸出:
(1, Alice, Book, 100)
(1, Alice, Pen, 50)
(1, Alice, Paper, 30)
Buffered 指針現在指向 (2, Book, 200)
推進 Streamed 指針
【步驟 2】
Streamed: user_id=1 (Alice2)
Buffered: user_id=2 (Book)
→ Streamed < Buffered
但等等! buffered_data 中還有 user_id=1 的緩衝數據
→ 重用緩衝數據(無需重新掃描)
產生輸出:
(1, Alice2, Book, 100)
(1, Alice2, Pen, 50)
(1, Alice2, Paper, 30)
推進 Streamed 指針,釋放 buffered_data
【步驟 3】
Streamed: user_id=2 (Bob)
Buffered: user_id=2 (Book)
→ 相等!
緩衝 user_id=2 的行:
buffered_data = [
(2, Book, 200),
(2, Pencil, 80)
]
產生輸出:
(2, Bob, Book, 200)
(2, Bob, Pencil, 80)
推進兩側指針
【步驟 4】
Streamed: user_id=3 (Carol)
Buffered: user_id=4 (Eraser)
→ Streamed < Buffered
對於 INNER JOIN,不輸出
對於 LEFT JOIN,輸出 (3, Carol, NULL, NULL)
推進 Streamed 指針
【步驟 5】
Streamed 已耗盡,結束
關鍵洞察:
在討論 Join 策略選擇之前,我們還需要了解第三種 Join 算法:Nested Loop Join(嵌套循環連接)。
Nested Loop Join 用於沒有等值連接條件的情況:
-- 場景 1: 非等值連接
SELECT *
FROM products p
JOIN orders o ON o.price > p.cost * 1.2;
-- 場景 2: 複雜的連接條件
SELECT *
FROM employees e1
JOIN employees e2 ON e2.salary BETWEEN e1.salary * 0.8 AND e1.salary * 1.2;
-- 場景 3: CROSS JOIN + Filter
SELECT *
FROM table1 t1, table2 t2
WHERE t1.col1 + t2.col2 > 100;
這些查詢無法使用 Hash Join 或 Sort-Merge Join,因為:
DataFusion 的 NestedLoopJoinExec
實現:
// datafusion/physical-plan/src/joins/nested_loop_join.rs
/// Nested Loop Join: 適用於沒有等值連接條件的場景
///
/// 執行流程:
/// 1. 緩衝整個左側(Build 側)到記憶體
/// 2. 對右側(Probe 側)的每一行:
/// - 與左側的所有行進行笛卡爾積
/// - 評估 join filter
/// - 產生匹配的結果
pub struct NestedLoopJoinExec {
/// 左側(build 側)
pub(crate) left: Arc<dyn ExecutionPlan>,
/// 右側(probe 側)
pub(crate) right: Arc<dyn ExecutionPlan>,
/// 連接過濾條件(必須有,否則是純 CROSS JOIN)
pub(crate) filter: Option<JoinFilter>,
/// Join 類型
pub(crate) join_type: JoinType,
/// Future 用於非同步載入左側數據
build_side_data: OnceAsync<JoinLeftData>,
// ... 其他欄位
}
執行邏輯:
// 簡化的 Nested Loop Join 實現
async fn nested_loop_join(
build_input: SendableRecordBatchStream,
probe_input: SendableRecordBatchStream,
filter: JoinFilter,
) -> Result<Vec<RecordBatch>> {
// 階段 1: 緩衝整個 Build 側
let mut build_batches = Vec::new();
while let Some(batch) = build_input.next().await {
build_batches.push(batch?);
}
let build_data = concat_batches(&schema, &build_batches)?;
let mut output = Vec::new();
// 階段 2: 對每個 Probe batch
while let Some(probe_batch) = probe_input.next().await {
let probe_batch = probe_batch?;
// 對 Probe batch 的每一行
for probe_idx in 0..probe_batch.num_rows() {
// 與 Build 側的每一行進行組合
for build_idx in 0..build_data.num_rows() {
// 創建組合行
let combined = combine_rows(
&build_data, build_idx,
&probe_batch, probe_idx
);
// 評估 filter
if filter.evaluate(&combined)? {
output.push(combined);
}
}
}
}
Ok(output)
}
時間複雜度: O(N × M) - 每個 left 行都要與每個 right 行比較
適用場景:
現在我們了解了三種 Join 算法:
那麼,DataFusion 如何在它們之間做出選擇?
讓我們詳細分析 physical_planner.rs
中的決策流程:
// datafusion/core/src/physical_planner.rs
// 從 LogicalPlan::Join 生成物理計劃
LogicalPlan::Join(Join {
left, right, on, filter, join_type, ..
}) => {
let join_on = /* 提取等值連接鍵 */;
let join_filter = /* 提取非等值過濾條件 */;
// 決策樹開始
// 【決策 1】是否有等值連接鍵?
if join_on.is_empty() {
// 沒有等值連接鍵
if join_filter.is_none() && join_type == JoinType::Inner {
// 純 CROSS JOIN,使用專用的 CrossJoinExec
return CrossJoinExec::new(left, right);
} else {
// 有非等值條件,使用 NestedLoopJoin
return NestedLoopJoinExec::try_new(
left, right, join_filter, join_type
);
}
}
// 有等值連接鍵,繼續決策
// 讀取配置
let prefer_hash_join = config.prefer_hash_join; // 默認 true
let repartition_joins = config.repartition_joins;
let target_partitions = config.target_partitions;
// 【決策 2】是否啟用並行?
let can_parallel = target_partitions > 1 && repartition_joins;
if can_parallel && !prefer_hash_join {
// 【策略 A】啟用並行 + 偏好 SortMergeJoin
return SortMergeJoinExec::try_new(
left, right, join_on, join_filter, join_type
);
} else if can_parallel && prefer_hash_join {
// 【策略 B】啟用並行 + 偏好 HashJoin(最常見)
return HashJoinExec::try_new(
left, right, join_on, join_filter, join_type,
PartitionMode::Auto // 自動選擇分區策略
);
} else {
// 【策略 C】不並行,使用單分區 HashJoin
return HashJoinExec::try_new(
left, right, join_on, join_filter, join_type,
PartitionMode::CollectLeft // 收集 left 到單一分區
);
}
}
DataFusion 提供了幾個關鍵配置來控制 Join 策略:
// datafusion/common/src/config.rs
// 1. prefer_hash_join (默認: true)
/// 當設為 true 時,物理規劃器偏好 HashJoin 而非 SortMergeJoin
/// HashJoin 通常更高效,但消耗更多記憶體
pub prefer_hash_join: bool, default = true
// 2. repartition_joins (默認: true)
/// 是否允許為 Join 重新分區數據
/// 如果 false,則使用單分區模式
pub repartition_joins: bool, default = true
// 3. target_partitions (默認: CPU 核心數)
/// 目標並行度
pub target_partitions: usize, default = num_cpus
// 4. hash_join_single_partition_threshold (默認: 1MB)
/// HashJoin 使用單分區模式的閾值
/// 如果 Build 側小於此值,使用 CollectLeft 模式
pub hash_join_single_partition_threshold: usize, default = 1024 * 1024
物理規劃器生成初始計劃後,物理優化器會進一步優化:
優化 1: EnforceDistribution 規則
確保 Join 兩側滿足分區要求:
// 偽代碼
if join.mode == PartitionMode::Partitioned {
// SortMergeJoin 或 Partitioned HashJoin
// 需要兩側按相同的 hash 值分區
if !left.satisfies_hash_partitioning(join_keys) {
// 插入 RepartitionExec
left = RepartitionExec::try_new(
left,
Partitioning::Hash(join_keys, target_partitions)
);
}
if !right.satisfies_hash_partitioning(join_keys) {
right = RepartitionExec::try_new(
right,
Partitioning::Hash(join_keys, target_partitions)
);
}
}
優化 2: EnforceSorting 規則
如果選擇了 SortMergeJoin,確保輸入已排序:
// pseudocode
if let SortMergeJoinExec { left, right, on, .. } = join {
if !left.output_ordering().satisfies(on) {
// 插入 SortExec
left = SortExec::new(left, on);
}
if !right.output_ordering().satisfies(on) {
right = SortExec::new(right, on);
}
}
讓我們通過幾個實際場景來理解策略選擇:
案例 1: 典型的 OLAP 查詢
SELECT
p.product_name,
SUM(o.amount) as total_sales
FROM orders o
JOIN products p ON o.product_id = p.product_id
WHERE o.order_date >= '2024-01-01'
GROUP BY p.product_name;
分析:
o.product_id = p.product_id
DataFusion 的選擇:
HashJoinExec (PartitionMode::CollectLeft)
├─ products (Build 側,收集到單分區)
│ └─ TableScan
└─ orders (Probe 側,保持分區)
└─ FilterExec (order_date >= '2024-01-01')
└─ TableScan
原因:
案例 2: 兩個大表 Join
SELECT *
FROM huge_fact_table1 f1
JOIN huge_fact_table2 f2 ON f1.key = f2.key;
分析:
DataFusion 的選擇:
HashJoinExec (PartitionMode::Partitioned)
├─ RepartitionExec (Hash(key), 16 partitions)
│ └─ fact_table1
└─ RepartitionExec (Hash(key), 16 partitions)
└─ fact_table2
原因:
案例 3: 已排序的時序數據
-- 兩個表都已按 timestamp 排序
SELECT *
FROM sensor_data_2023 s1
JOIN sensor_data_2024 s2
ON s1.sensor_id = s2.sensor_id
WHERE s1.timestamp BETWEEN '2023-12-01' AND '2023-12-31'
AND s2.timestamp BETWEEN '2024-01-01' AND '2024-01-31'
ORDER BY s1.sensor_id, s1.timestamp;
分析:
理想的計劃(如果設置 prefer_hash_join=false):
SortMergeJoinExec
├─ FilterExec (timestamp BETWEEN ...)
│ └─ TableScan (sensor_data_2023)
└─ FilterExec (timestamp BETWEEN ...)
└─ TableScan (sensor_data_2024)
優勢:
但實際上,由於 prefer_hash_join=true,DataFusion 可能仍選擇 HashJoin,除非用戶手動配置。
案例 4: 非等值連接
SELECT *
FROM employees e1
JOIN employees e2
ON e2.salary BETWEEN e1.min_salary AND e1.max_salary;
DataFusion 的選擇:
NestedLoopJoinExec
├─ employees (左側,較小)
└─ employees (右側)
原因: 沒有等值連接條件,只能使用 Nested Loop Join
今天我們完成了 Join 算子的完整探討,學習了三種核心 Join 算法及其選擇策略。
算法 | 核心優勢 | 主要限制 | 最佳場景 |
---|---|---|---|
Hash Join | 快速查找 O(1)高並行度 | 需要構建 Hash Table記憶體消耗較大 | 大多數等值連接記憶體充足數據未排序 |
Sort-Merge Join | 利用已有排序記憶體友善保持排序 | 需要預排序未排序時開銷大 | 數據已排序記憶體受限需要排序輸出 |
Nested Loop Join | 支援任意條件實現簡單 | O(N×M) 複雜度性能較差 | 非等值連接Build 側極小無其他選擇 |
prefer_hash_join=true
,適合大多數 OLAP 場景至此,Join 算子的介紹算是告一個段落了。明天,我們會轉向另一個主題:數據源整合 Part 1 - TableProvider 機制,探討 DataFusion 如何抽象和統一各種數據源的訪問。